Project 4 : Stack Overflow - askubuntu dataset
Adarsh and Ayush
The project involves extracting data from several smaller datasets and combining them together to do
analysis.
Dataset:
The dataset contains logs for askubuntu stackexchange logs (https://askubuntu.com) in XML format.
The total size of the dataset is 22 GB.
It is stored in the GCS bucket gs://stackoverflow-dataset-677
The dataset includes multiple xml files corresponding to different attributes of the dataset.
The following are the relevant features for each XML file.
Users: Reputation, CreationDate, DisplayName, WebsiteUrl, Location, Views, UpVotes,
DownVotes, AccountId
Posts: Id, PostTypeId, AcceptedAnswerId, CreationDate, Score, Body, OwnerUserId,
Title, Tags, AnswerCount, CommentCount
Comments : RowId, PostId, Score, Text, CreationDate, UserId, ContentLicense
Tags: RowId, TagName, Count, ExcerptPostId, WikiPostId
Badges : RowId, UserId, Name, Date, Class, TagBased
Votes : RowId, PostId, VoteTypeId, CreationDate
Tasks to be completed:
1. a) Extracting user id and username and storing it to GCS.
b) Extracting the comments and doing inner join with Spark SQL and display to user.
2. Trending Users with max comments: Finding users who posted maximum comments and
visualizing it.
3. Trending Topics in Comments: Extracting the trending topics in comments related to ubuntu
and visualizing it.
4. Automating task 1 with Apache Airflow i.e. in a single pipeline
Server Setup:
We used Google Cloud DataProc to setup a pyspark cluster of 1 Master Nodes (2 vCPUs ) and 6
Worker Nodes (1vCPU) each with spark 3.2, python 3 and ubuntu 18.04 operating system.
For first 3 tasks, the server setup was done manually.
Manual Setup:
Result:
Cluster
Cluster Nodes with 1 master and 6 worker nodes
For the last task i.e. automating task 1 with airflow, an airflow cluster was set up using Google Cloud
Composer that created and destroyed the dataproc (pyspark) cluster using an airflow scheduler.
Next, we set up a kubernetes cluster using n1-standard v2 machines for master and 3 n1-standard v2
machines with a web server using n1-standard v2 machine to host Airflow GUI web page.
Data Processing
To create a dataframe from the xml files, we are using python’s xml.etree.ElementTree module.
1a) Extracting user id and username and storing to the bucket: jupyter notebook link
Raw Data:
After cleaning, filtering and converting the data to dataframe, we get:
On completion of creating the dataframe, we store the data to GCS
1b) Extracting the comments and joining username with Spark SQL: jupyter notebook link
Raw Data:
After cleaning, filtering and converting the data to dataframe, we get:
Next, we read the username and user id from the csv file created in the previous step.
To allow join queries, we made the columns to be integer/long type.
Converting both datasets to TempView for allowing SQL queries.
comments_df.createOrReplaceTempView("comments")
user_df.createOrReplaceTempView("users")
Inner Join Query, introducing username to comments dataframe.
2. Trending users with max comments:
After extracting the comments data and converting to dataframe, we got:
Next, we find the trending users by using groupBy aggregate function and sorting it with respect
to count in descending order.
Result
Next, we create a temp view of the trending users and merge it with usernames obtained using 1a.
Joining the usernames with the trending user.
Storing it to GCS, will be imported to Google BigQuery for visualization with Google Data Studio.
After importing the dataset to bigquery, and exporting the data to Google Data Studio for visualization.
Result
3: Trending Topics in Comments:
Notebook Link:
Obtaining the data from GCS and cleaning it.
After extracting the attributes (comment text) from xml,we get
Next, we have to remove the stop words from this text. So, we downloaded the stop words list online.
Importing the stop words from file
Next, we converted it into dataframe. Converting the stop words dataframe to list
Next, removing stopwords from main dataframe and converting into (stop_word,1) tuple
Next, finding the total count of words by using reduceByKey and converting it to Dataframe for storing
to GCS.
Storing to GCS and sorting the DF in descending order w.r.t count
Next, we imported the data to BigQuery from GSC to export to Google Data Studio.
Visualizing the data using Google Data Studio.
Result: Trending Keywords: Ubuntu is the most trending keyword with install as the second. This
means most users comment about installation.
4. Automating Part 1 with Google Cloud Composer (Apache Airflow).
After setting up the server (shown in the beginning), next we create python scripts for part 1 and 2 and
upload them to GCS. The GCS path will be referenced during the final orchestration.
Part 1: users.py
from pyspark.sql.functions import *
import time
import pyspark.sql.functions as F
from pyspark.sql.types import *
import xml.etree.ElementTree as ET
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("My PySpark code") \
.getOrCreate()
text_file = sc.textFile("gs://stackoverflow-dataset-677/Users.xml")
filteredRDD = text_file.filter(lambda x: x.startswith(" <row "))
cleanedRDD = filteredRDD.map(lambda x: x.lstrip(" "))
def parse_xml(rdd):
root = ET.fromstring(rdd)
rec = []
id = root.attrib['Id']
if id == "-1":
id = "1"
rec.append(id)
rec.append(root.attrib['DisplayName'])
return rec
records_rdd = cleanedRDD.map(lambda x : parse_xml(x))
user_data = ["id","username"]
user_df = records_rdd.toDF(user_data)
user_df.repartition(1).write.csv("gs://stackoverflow-dataset-677/users_out1", sep=',')
Part 2: comments.py
from pyspark.sql.functions import *
import time
import pyspark.sql.functions as F
from pyspark.sql.types import *
import xml.etree.ElementTree as ET
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("My PySpark code") \
.getOrCreate()
text_file = sc.textFile("gs://stackoverflow-dataset-677/Comments.xml")
filteredRDD = text_file.filter(lambda x: x.startswith(" <row "))
cleanedRDD = filteredRDD.map(lambda x: x.lstrip(" "))
def parse_xml(rdd):
"""
Read the xml string from rdd, parse and extract the elements,
then return a list of list.
"""
root = ET.fromstring(rdd)
rec = []
if "PostId" in root.attrib:
rec.append(int(root.attrib['PostId']))
else:
rec.append(0)
if "Score" in root.attrib:
rec.append(int(root.attrib['Score']))
else:
rec.append(0)
if "Text" in root.attrib:
rec.append(root.attrib['Text'])
else:
rec.append("N/A")
if "CreationDate" in root.attrib:
rec.append(root.attrib['CreationDate'])
else:
rec.append("N/A")
if "UserId" in root.attrib:
rec.append(int(root.attrib['UserId']))
else:
rec.append(0)
return rec
records_rdd = cleanedRDD.map(lambda x : parse_xml(x))
comments_data = ["postId","score","text","creationDate","userId"]
comments_df = records_rdd.toDF(comments_data)
comments_df.createOrReplaceTempView("comments")
comments_sql_df = spark.sql("SELECT * FROM comments")
users_data = sc.textFile("gs://stackoverflow-dataset-677/users_out1/*.csv")
def create_user(rdd):
rdd_split = rdd.split(",")
return [int(rdd_split[0]),rdd_split[1]]
users_rdd = users_data.map(lambda x: create_user(x))
user_data = ["id","username"]
user_df = users_rdd.toDF(user_data)
user_df.createOrReplaceTempView("users")
comments_users_sql_df = spark.sql("SELECT * FROM users u JOIN comments c ON u.id = c.UserId")
comments_users_sql_df.repartition(1).write.csv("gs://stackoverflow-dataset-677/users_out1",
sep=',')
Next, we created two variables for the final orchestration. The GCP project id and the zone where the
clusters will be created.
Final Orchestration: To connect both the scripts together and for automatic creation and
destruction of Spark Clusters, we created a DAG file.
This contains the path of the users.py and comments.py file and the airflow variables as
well.
main_dag.py
# Ref:
https://medium.com/analytics-vidhya/a-gentle-introduction-to-data-workflows-with-apach
e-airflow-and-apache-spark-6c2cd9aee573
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# STEP 2:Define a start date
#In this case yesterday
yesterday = datetime(2020, 12, 10)
SPARK_CODE = ('gs://stackoverflow-dataset-677/01_user.py')
SPARK_CODE2 = ('gs://stackoverflow-dataset-677/02_user_comments_join.py')
dataproc_job_name = 'extract_users_job_dataproc'
dataproc_job_name2 = 'extract_comments_join_users_dataproc'
# STEP 3: Set default arguments for the DAG
default_dag_args = {
'start_date': yesterday,
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'project_id': models.Variable.get('project_id')
}
# STEP 4: Define DAG
# set the DAG name, add a DAG description, define the schedule interval and pass the
default arguments defined before
with models.DAG(
'comments_extract_user_join_spark_workflow',
description='DAG for extracting comments and merging with user name',
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
# STEP 5: Set Operators
# BashOperator
# A simple print date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# dataproc_operator
# Create small dataproc cluster
create_dataproc = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('dataproc_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
run_spark = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark',
main=SPARK_CODE,
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
job_name=dataproc_job_name
)
run_spark2 = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark2',
main=SPARK_CODE2,
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
job_name=dataproc_job_name2
)
# dataproc_operator
# Delete Cloud Dataproc cluster.
delete_dataproc = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# STEP 6: Set DAGs dependencies
# Each task should run after have finished the task before.
print_date >> create_dataproc >> run_spark >> run_spark2 >> delete_dataproc
To run the main dag file, we upload it to DAGs folder and the script ran automatically in 2 minutes.
Main dag file with with workflow name “Comments_extract_user_join_spark_workflow.
Graph View of the Workflow
Tree View
Finally, the output file was saved in GCS.